import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;

import javax.annotation.Nullable;


public class WaterMark_disorder {


    public static void main(String[] args) throws Exception
    {


    StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();

        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

    DataStream<String> dataStream = env
            .socketTextStream("Desktop", 3456).assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<String>() {
                long currentTimeStamp = 0L;
                long maxDelayAllowed = 5000L;
                long currentWaterMark;

                @Override
                public Watermark getCurrentWatermark() {
                    currentWaterMark = currentTimeStamp - maxDelayAllowed;
                    System.out.println("当前水位线:" + currentWaterMark);
                    return new Watermark(currentWaterMark);
                }

                @Override
                public long extractTimestamp(String s, long l) {
                    String[] arr = s.split(",");
                    long timeStamp = Long.parseLong(arr[1]);
                    currentTimeStamp = Math.max(timeStamp, currentTimeStamp);
                    System.out.println("Key:" + arr[0] + ",EventTime:" + timeStamp + ",前一条数据的水位线:" + currentWaterMark);
                    return timeStamp;
                }

            });
//    在设置WaterMark方法中，先调用extractTimestamp方法，再调用getCurrentWatermark方法

        dataStream.map(new MapFunction<String, Tuple2<String, String>>()

    {
        @Override
        public Tuple2<String, String> map (String s) throws Exception {
        return new Tuple2<String, String>(s.split(",")[0], s.split(",")[1]);
    }
    }).

    keyBy(0)
                .

    window(TumblingEventTimeWindows.of(Time.seconds(5)))
            .

    fold("Start:",new FoldFunction<Tuple2<String, String>, String>()
    {
        @Override
        public String fold (String s, Tuple2 < String, String > o) throws Exception
        {
            return s + " - " +o.f1;
        }
    }).

    print();

        env.execute("WaterMark Test Demo");
}
}



/*
代码来自:
http://www.louisvv.com/archives/2225.html
*/